"""
RabbitMQ Reliable Message Consumer Module
"""

import asyncio
import aio_pika
import json
import logging
from datetime import datetime
from typing import Dict, Any, Optional, Callable, Set

from .config import config

logger = logging.getLogger(__name__)


class ReliableConsumer:
    """Reliable Message Consumer"""

    def __init__(self,
                 queue_name: Optional[str] = None,
                 consumer_name: Optional[str] = None,
                 message_handler: Optional[Callable] = None):
        """
        Initialize consumer
        
        Args:
            queue_name: Queue name, defaults to config value
            consumer_name: Consumer name
            message_handler: Custom message handler function
        """
        self.queue_name = queue_name or config.queue_name
        self.consumer_name = consumer_name or "reliable_consumer"
        self.message_handler = message_handler or self.default_message_handler
        self.connection = None
        self.channel = None
        self.queue = None
        self.processed_messages: Set[str] = set()  # Store processed message IDs to prevent duplicate processing

    async def connect(self):
        """Establish connection"""
        try:
            connection_config = config.get_connection_config()
            self.connection = await aio_pika.connect_robust(connection_config['uri'])
            self.channel = await self.connection.channel()

            # Set QoS to ensure only one message is processed at a time
            await self.channel.set_qos(prefetch_count=connection_config['prefetch_count'])

            # Declare queue (ensure queue exists)
            self.queue = await self.channel.declare_queue(
                self.queue_name,
                durable=True,
                auto_delete=False
            )

            logger.info(f"[Consumer-{self.consumer_name}] Connected, listening to queue: {self.queue_name}")

        except Exception as e:
            logger.error(f"[Consumer-{self.consumer_name}] Connection failed: {e}")
            raise

    async def process_message(self, message: aio_pika.IncomingMessage):
        """Core message processing logic"""
        try:
            # Parse message
            message_data = json.loads(message.body.decode('utf-8'))
            message_id = message_data.get('message_id')

            # Check if message has been processed before (idempotency check)
            if message_id in self.processed_messages:
                logger.warning("=" * 50)
                logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping:")
                logger.warning(f"[Consumer-{self.consumer_name}] Message ID: {message_id}")
                logger.warning(f"[Consumer-{self.consumer_name}] Message content: {json.dumps(message_data, ensure_ascii=False, indent=2)}")
                logger.warning(f"[Consumer-{self.consumer_name}] Total processed messages: {len(self.processed_messages)}")
                logger.warning("=" * 50)
                await message.ack()
                return

            logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}")
            logger.info(f"[Consumer-{self.consumer_name}] Message content: {message_data}")

            # Retry processing message directly
            success = await self.retry_process_message(message_data, message_id, 0)

            # Only record processed message ID after successful processing
            if success:
                self.processed_messages.add(message_id)
                # Acknowledge message
                await message.ack()
                logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged")
                logger.info(f"[Consumer-{self.consumer_name}] Current processed message count: {len(self.processed_messages)}")
            else:
                # Processing failed, don't record message ID, send to dead letter queue
                await self.send_to_dead_letter_queue(message, message_id, "Processing failed")
                await message.ack()  # Acknowledge message to avoid infinite retry

        except Exception as e:
            logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}")
            # Send directly to dead letter queue with error information
            message_data = json.loads(message.body.decode('utf-8'))
            message_id = message_data.get('message_id')
            await self.send_to_dead_letter_queue(message, message_id, str(e))
            await message.ack()  # Acknowledge message to avoid infinite retry

    async def default_message_handler(self, message_data: Dict[str, Any]):
        """Default message handler function"""
        # Simulate processing time
        await asyncio.sleep(1)

        # Decide whether to fail based on message type
        message_type = message_data.get('type', '')

        if message_type == 'will_fail':
            # Specific type of messages always fail, used for testing dead letter queue
            raise Exception(f"Simulated business processing failure: {message_data.get('content', '')}")
        else:
            pass

        logger.info(f"[Consumer-{self.consumer_name}] Business logic processing completed: {message_data.get('content', '')}")

    async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool:
        """Retry processing message directly"""
        max_retries = config.max_retries
        last_error = None

        for attempt in range(max_retries + 1):
            try:
                logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}")
                await self.message_handler(message_data)
                logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully")
                return True  # Processing successful, return True

            except Exception as e:
                last_error = str(e)
                logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}")

                if attempt < max_retries:
                    # Wait for a while before retrying
                    await asyncio.sleep(1)
                else:
                    # All retries failed, return False
                    logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}")
                    return False

    async def send_to_dead_letter_queue(self, message: aio_pika.IncomingMessage, message_id: str,
                                        error_info: str = None):
        """发送消息到死信队列"""
        try:
            # 解析消息内容
            message_data = json.loads(message.body.decode('utf-8'))

            # 构建死信消息，包含原始消息和错误信息
            dead_letter_data = {
                'original_message': message_data,
                'error_info': error_info or '重试失败',
                'dead_letter_timestamp': datetime.now().isoformat(),
                'message_id': message_id,
                'consumer_name': self.consumer_name,
                'queue_name': self.queue_name
            }

            logger.error(f"[Consumer-{self.consumer_name}] Message sent to dead letter queue: {message_id}, error: {error_info}")

            # Create dead letter exchange and queue
            dead_letter_config = config.get_dead_letter_config()
            dead_letter_exchange = await self.channel.declare_exchange(
                dead_letter_config['dead_letter_exchange'],
                aio_pika.ExchangeType.DIRECT,
                durable=True
            )

            dead_letter_queue = await self.channel.declare_queue(
                dead_letter_config['dead_letter_queue'],
                durable=True,
                auto_delete=False
            )

            await dead_letter_queue.bind(
                dead_letter_exchange,
                routing_key=dead_letter_config['dead_letter_routing_key']
            )

            # Create dead letter message
            dead_letter_message = aio_pika.Message(
                body=json.dumps(dead_letter_data, ensure_ascii=False).encode('utf-8'),
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
                message_id=f"dead_letter_{message_id}"
            )

            # Send to dead letter queue
            await dead_letter_exchange.publish(
                dead_letter_message,
                routing_key=dead_letter_config['dead_letter_routing_key']
            )

            logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} sent to dead letter queue")

        except Exception as e:
            logger.error(f"[Consumer-{self.consumer_name}] Failed to send to dead letter queue: {e}")
            logger.error(f"[Consumer-{self.consumer_name}] Original message content: {message.body.decode('utf-8') if message.body else 'None'}")

    async def start_consuming(self):
        """开始消费消息"""
        self.consumer_tag = await self.queue.consume(self.process_message)
        logger.info(f"[消费者-{self.consumer_name}] 开始消费消息...")

        # 保持消费者运行
        await asyncio.Future()

    async def stop_consuming(self):
        """停止消费消息"""
        if self.queue and self.consumer_tag:
            await self.queue.cancel(self.consumer_tag)
            logger.info(f"[消费者-{self.consumer_name}] 已停止消费消息")

    async def close(self):
        """关闭连接"""
        try:
            await self.stop_consuming()
            if self.connection:
                await self.connection.close()
                logger.info(f"[消费者-{self.consumer_name}] 连接已关闭")
                # 打印最终统计信息
                self.print_processed_messages_stats()
        except Exception as e:
            logger.error(f"[消费者-{self.consumer_name}] 关闭连接时出错: {e}")

    def get_processed_messages_stats(self):
        """获取已处理消息的统计信息"""
        return {
            'total_processed': len(self.processed_messages),
            'processed_message_ids': list(self.processed_messages)
        }
    
    def print_processed_messages_stats(self):
        """打印已处理消息的统计信息"""
        stats = self.get_processed_messages_stats()
        logger.info("=" * 50)
        logger.info(f"[消费者-{self.consumer_name}] 已处理消息统计信息:")
        logger.info(f"[消费者-{self.consumer_name}] 总处理数量: {stats['total_processed']}")
        logger.info(f"[消费者-{self.consumer_name}] 已处理消息ID列表: {stats['processed_message_ids']}")
        logger.info("=" * 50)

    async def __aenter__(self):
        """异步上下文管理器入口"""
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        await self.close()
